iT邦幫忙

2025 iThome 鐵人賽

DAY 5
4
AI & Data

「知其然,更知其所以然:什麼是 Real-time (Streaming) Pipeline?從造輪子到 Flink 與 RisingWave」系列 第 5

【知其然,更知其所以然】Day 5: Speed Layer 效能瓶頸與優化

  • 分享至 

  • xImage
  •  

https://ithelp.ithome.com.tw/upload/images/20250825/2012475891A0a0fP4S.jpg
在上一篇我們實作了基礎的 Speed Layer 架構,包含 Source、Sink、SimpleStreamingEngine 三個核心組件。一開始系統運作良好,Consumer 順利處理訂單數據,一切看起來很完美。

但當系統面臨真正的挑戰時 - 比如雙 11 大促銷 - 問題就浮現了:

  • Consumer 處理能力達到瓶頸
  • 訊息開始在佇列中堆積,等待處理

這時候我們發現,效能瓶頸是 Stream Processing 系統必須面對的核心挑戰

常見的 Consumer 效能瓶頸

透過實際經驗,我們歸納出 Consumer 最常見的效能瓶頸。當然,還有其他較少見的問題(如 GC 調優、記憶體洩漏、網路配置等),但以下四種是最主要且最容易遇到的:

1. 資料庫寫入速度不足

問題描述:每筆訊息都執行一次 INSERT,然後立即 commit,對資料庫造成大量小型交易負擔。

影響:資料庫頻繁處理小型交易,無法發揮批量處理的效率優勢。

解決方案:實作批量寫入機制,一次收集數百筆記錄再執行批量 INSERT,大幅減少 transaction commit 次數。

2. JSON 解析與序列化開銷

問題描述:每筆訊息都需要進行 JSON 反序列化處理。

影響:CPU 資源被大量 JSON 解析操作佔用,降低整體處理效率。

解決方案

  • 使用更高效的 JSON 解析庫(如 orjson)
  • 考慮採用二進位格式(Avro、Protobuf)降低解析成本

3. Kafka Partition 數量不足

問題描述:Topic 的 Partition 數量過少,限制了 Consumer 的平行處理能力。

影響:即使增加 Consumer 實例,也無法提升整體吞吐量。

解決方案:適當增加 Partition 數量,讓系統能夠水平擴展。

4. 網路延遲

問題描述:Broker、Consumer、資料庫之間的網路距離造成額外延遲。

影響:每個網路往返都增加處理時間。

解決方案:盡量將相關組件部署在相近的網路位置。

經驗分享:在實務中,問題 2 和 3 相對容易解決,問題 4 在大多數企業基礎設施中也不會是主要瓶頸。真正棘手的往往是問題 1(資料庫寫入速度),這也是我們接下來要深入探討的重點。


優化思路:先過濾再入庫

在解決資料庫寫入瓶頸之前,我們可以先從另一個角度思考:減少需要處理的資料量

核心概念:在資料進入資料庫之前,先過濾掉不需要的資料,只處理真正有價值的訊息。

舉例來說,如果訂單狀態為 'removed' 的資料對業務沒有意義,我們可以在 Consumer 階段就將這些資料過濾掉,避免佔用後續的處理資源。

重要提醒
本文所有程式碼皆為教學與概念演示用的 Pseudo Code,可以幫助你理解並解決實際場景的問題,但這裡的程式碼不能直接使用,主要目的是用來講解 Stream Processing 的設計思路與核心概念。閱讀時建議把重點放在理解整體架構和設計邏輯上,程式碼細節部分可以輕鬆看過就好。


DataFrame 抽象層設計

為了實現靈活的資料過濾功能,我們引入 DataFrame 概念。這個抽象層位於 Source 和 Sink 之間,負責資料的轉換和過濾操作。

DataFrame 架構設計

Source → DataFrame → Sink
         (過濾、轉換)

DataFrame 的核心功能:

  • 過濾操作:支援自訂過濾條件
  • 鏈式調用:支援多個過濾條件串接
  • 訊息路由:將處理後的資料發送到指定的 Sink

SimpleDataFrame 核心程式碼一步步講解

Step 1:SimpleDataFrame 初始化

class SimpleDataFrame:
    def __init__(self, name: str = "dataframe"):
        self.name = name
        self._filters = []  # 過濾函數列表
        self._sinks = []    # Sink 列表

核心概念

  • SimpleDataFrame 管理兩個重要清單:過濾器和 Sink
  • 支援鏈式操作,可以串接多個過濾條件

Step 2:filter 方法核心邏輯

def filter(self, func) -> 'SimpleDataFrame':
    # 創建新的 DataFrame 實例以支援鏈式調用
    new_df = SimpleDataFrame(f"{self.name}_filtered")
    new_df._filters = self._filters.copy()  # 複製現有過濾器
    new_df._filters.append(func)  # 添加新過濾器
    new_df._sinks = self._sinks.copy()  # 複製現有 Sink
    return new_df

設計重點

  • 每次 filter 都創建新的 DataFrame 實例
  • 保留舊的過濾器,添加新的過濾條件
  • 支援鏈式調用:df.filter(...).filter(...)

Step 3:sink 方法

def sink(self, sink: BaseSink) -> 'SimpleDataFrame':
    self._sinks.append(sink)
    return self  # 返回自身支援鏈式調用

核心功能:將處理後的資料發送到指定的 Sink。

Step 4:process_message 核心處理邏輯

def process_message(self, message) -> bool:
    # 依序檢查所有過濾器
    for filter_func in self._filters:
        if not filter_func(message):
            return False  # 被過濾掉
    
    # 通過所有過濾器,發送到所有 Sink
    for sink in self._sinks:
        sink.write(message)
    
    return True

處理流程

  1. 訊息依序通過每個過濾器
  2. 如果任何過濾器返回 False,訊息被過濾掉
  3. 通過所有過濾器的訊息會發送到所有 Sink

完整 SimpleDataFrame 程式碼

# 簡單的 DataFrame 類別,支援 filter 算子
import logging
from typing import Any, Dict, Callable, List
from .sink import BaseSink

logger = logging.getLogger(__name__)

class SimpleDataFrame:
    """
    簡單的 DataFrame 類別,支援基本的流處理操作
    """
    
    def __init__(self, name: str = "dataframe"):
        self.name = name
        self._filters: List[Callable[[Dict[str, Any]], bool]] = []  # 過濾函數列表
        self._sinks: List[BaseSink] = []  # Sink 列表
        
    def filter(self, func: Callable[[Dict[str, Any]], bool]) -> 'SimpleDataFrame':
        """
        添加過濾條件
        
        :param func: 過濾函數,接收 message 字典,返回 True 表示通過過濾
        :return: 返回新的 SimpleDataFrame 實例以支援鏈式調用
        """
        # 創建新的 DataFrame 實例以支援鏈式調用
        new_df = SimpleDataFrame(f"{self.name}_filtered")
        new_df._filters = self._filters.copy()  # 複製現有的過濾器
        new_df._filters.append(func)  # 添加新的過濾器
        new_df._sinks = self._sinks.copy()  # 複製現有的 Sink
        
        logger.debug(f"Added filter to {self.name}")
        return new_df
    
    def __getitem__(self, condition: Callable[[Dict[str, Any]], bool]) -> 'SimpleDataFrame':
        """
        使用 [] 語法添加過濾條件,等同於 filter() 方法
        
        :param condition: 過濾函數,接收 message 字典,返回 True 表示通過過濾
        :return: 返回新的 SimpleDataFrame 實例以支援鏈式調用
        """
        return self.filter(condition)
    
    def sink(self, sink: BaseSink) -> 'SimpleDataFrame':
        """
        添加 Sink
        
        :param sink: Sink 實例
        :return: 返回自身以支援鏈式調用
        """
        self._sinks.append(sink)
        logger.debug(f"Added sink {sink.name} to {self.name}")
        return self
    
    def process_message(self, message: Dict[str, Any]) -> bool:
        """
        處理單個訊息,通過所有過濾器後發送到 Sink
        
        :param message: 訊息字典
        :return: True 表示訊息通過了所有過濾器
        """
        # 依序檢查所有過濾器
        for filter_func in self._filters:
            try:
                if not filter_func(message):
                    logger.debug("Message filtered out")
                    return False
            except Exception as e:
                logger.error(f"Error in filter: {e}")
                return False
        
        # 如果通過所有過濾器,發送到所有 Sink
        for sink in self._sinks:
            try:
                sink.write(message)
            except Exception as e:
                logger.error(f"Error writing to sink {sink.name}: {e}")
        
        return True

升級 SimpleStreamingEngine 支援 DataFrame

我們需要調整 SimpleStreamingEngine 的邏輯,讓它能夠支援 DataFrame 的概念。新的架構中,Source 會先將訊息交給對應的 DataFrame 處理,通過過濾與轉換後,再統一輸出到 Sink。

架構調整

原本的架構

    ┌─────────────────────┐
    │SimpleStreamingEngine│  ◄── Central Manager
    │                     │
    │    +add_source()    │
    │    +add_sink()      │
    │    + run()          │
    └─────────────────────┘
           │
           │ manages
           ▼
    ┌──────────────┐    ┌──────────────┐
    │    Source    │───▶│     Sink     │
    │              │    │              │
    │ KafkaSource  │    │PostgreSQLSink│
    └──────────────┘    └──────────────┘

升級後的架構

    ┌─────────────────────┐
    │SimpleStreamingEngine│  ◄── Central Manager
    │                     │
    │    +add_source()    │
    │    +dataframe()     │
    │    + run()          │
    └─────────────────────┘
           │
           │ manages
           ▼
    ┌──────────────┐    ┌──────────────┐    ┌──────────────┐
    │    Source    │───▶│  DataFrame   │───▶│     Sink     │
    │              │    │              │    │              │
    │ KafkaSource  │    │ +filter()    │    │PostgreSQLSink│
    └──────────────┘    └──────────────┘    └──────────────┘

SimpleStreamingEngine 升級重點一步步講解

Step 1:新增 DataFrame 支援

升級後的 SimpleStreamingEngine 需要管理 DataFrame,而不是直接管理 Sink:

class SimpleStreamingEngine:
    def __init__(self, name: str = "simple-streaming-engine"):
        self.name = name
        self._sources = []
        self._dataframes = []  # 新增:管理 DataFrame 列表
        self._source_dataframe_map = {}  # 新增:Source 到 DataFrame 的映射

核心變化

  • 移除直接的 Sink 管理
  • 新增 DataFrame 清單和映射關係

Step 2:dataframe 方法 - 建立 Source 和 DataFrame 的關聯

def dataframe(self, *, source: BaseSource) -> SimpleDataFrame:
    df = SimpleDataFrame(f"df-{source.name}")
    self._dataframes.append(df)
    self._source_dataframe_map[source] = df  # 建立映射
    self.add_source(source)
    return df

關鍵功能

  • 為每個 Source 創建專屬的 DataFrame
  • 建立 Source → DataFrame 的映射關係
  • 返回 DataFrame 供鏈式操作

Step 3:升級 run 方法

def run(self):
    # 設定所有 DataFrame 中的 Sink
    all_sinks = []
    for df in self._dataframes:
        all_sinks.extend(df._sinks)
    
    for sink in all_sinks:
        sink.setup()
    
    # 為每個 Source 設定訊息處理器
    for source in self._sources:
        source.message_handler = self._create_message_handler(source)
        source.run()

處理邏輯調整

  • 從所有 DataFrame 收集 Sink 並初始化
  • 為每個 Source 設定對應 DataFrame 的處理器

Step 4:新的訊息處理器

def _create_message_handler(self, source: BaseSource):
    def message_handler(message):
        if source in self._source_dataframe_map:
            df = self._source_dataframe_map[source]
            df.process_message(message)  # 交給 DataFrame 處理
    
    return message_handler

核心改變

  • 不再直接發送到 Sink
  • 根據映射找到對應的 DataFrame
  • 讓 DataFrame 負責過濾和轉發

完整的升級版 SimpleStreamingEngine

import logging
from typing import List
from .source import BaseSource
from .dataframe import SimpleDataFrame

logger = logging.getLogger(__name__)

class SimpleStreamingEngine:
    """
    支援 DataFrame 的簡單流處理引擎
    """
    
    def __init__(self, name: str = "simple-streaming-engine"):
        self.name = name
        self._sources: List[BaseSource] = []
        self._dataframes: List[SimpleDataFrame] = []
        self._source_dataframe_map = {}
    
    def add_source(self, source: BaseSource):
        """
        添加 Source 到流處理引擎
        """
        self._sources.append(source)
    
    def dataframe(self, *, source: BaseSource) -> SimpleDataFrame:
        """
        創建 DataFrame 並關聯到 Source
        """
        df = SimpleDataFrame(f"df-{source.name}")
        self._dataframes.append(df)
        self._source_dataframe_map[source] = df
        self.add_source(source)
        return df
    
    def run(self):
        """
        啟動流處理引擎,開始處理數據流
        """
        # 設定所有 DataFrame 中的 Sink
        all_sinks = []
        for df in self._dataframes:
            all_sinks.extend(df._sinks)
        
        for sink in all_sinks:
            sink.setup()
        
        # 為每個 Source 設定訊息處理器
        for source in self._sources:
            source.message_handler = self._create_message_handler(source)
            source.run()
    
    def _create_message_handler(self, source: BaseSource):
        """
        創建訊息處理器,將訊息路由到對應的 DataFrame
        """
        def message_handler(message):
            if source in self._source_dataframe_map:
                df = self._source_dataframe_map[source]
                df.process_message(message)
        
        return message_handler

實際應用:過濾 removed 訂單

現在讓我們看看如何實際使用升級後的架構來過濾無效資料。

使用方式一步步說明

Step 1:創建 SimpleStreamingEngine

engine = SimpleStreamingEngine(name="orders-processing-engine")

Step 2:創建 Source 並建立 DataFrame

orders_source = SimpleKafkaSource(name="orders-source", topic="orders")
sdf = engine.dataframe(source=orders_source)

Step 3:添加過濾條件

# 過濾掉 removed 訂單
sdf = sdf.filter(lambda msg: msg.get('value', {}).get('status') != 'removed')

# 或者使用 [] 語法(兩種寫法效果相同)
sdf = sdf[lambda msg: msg.get('value', {}).get('status') != 'removed']

Step 4:添加 Sink 並啟動

pg_sink = SimplePostgreSQLSink(...)
sdf.sink(pg_sink)
engine.run()  # 開始處理:Kafka → 過濾 → PostgreSQL

完整使用範例

# 1. 創建 SimpleStreamingEngine
engine = SimpleStreamingEngine(name="orders-processing-engine")

# 2. 創建 Kafka Source
orders_source = SimpleKafkaSource(name="orders-source", topic="orders")

# 3. 創建 DataFrame 並添加過濾器
sdf = engine.dataframe(source=orders_source)
sdf = sdf.filter(lambda msg: msg.get('value', {}).get('status') != 'removed')

# 4. 添加 PostgreSQL Sink
pg_sink = SimplePostgreSQLSink(
    name="orders-sink",
    table_name="valid_orders",
    # ... 其他參數
)

# 5. 組裝並啟動
sdf.sink(pg_sink)
engine.run()  # 開始處理:Kafka → 過濾 → PostgreSQL

資料流向

KafkaSource → DataFrame.filter() → PostgreSQLSink

這樣的設計讓過濾邏輯變得清晰且易於擴展,我們可以輕鬆添加更多過濾條件或修改處理邏輯。


總結

今天我們探討了 Speed Layer 中常見的效能瓶頸問題,並提出了「先過濾再入庫」的優化思路。

主要收穫

  1. 效能瓶頸識別:了解 Consumer 常見的四大效能問題及其解決方案
  2. DataFrame 抽象:引入 DataFrame 概念,實現靈活的資料過濾和轉換
  3. 架構升級:將 SimpleStreamingEngine 升級為支援 DataFrame 的版本
  4. 實際應用:透過具體範例展示如何過濾無效資料

通過 Source-DataFrame-Sink 的架構設計,我們建立了:

  • 更靈活的資料處理能力
  • 可擴展的過濾機制
  • 更好的系統效能表現

Day 6 預告:批量寫入優化

雖然過濾可以減少資料量,但當業務持續成長時,我們仍然需要面對大量有效資料的處理挑戰。下一篇我們將深入探討批量寫入機制,學習如何通過批量處理技術大幅提升資料庫寫入效能,讓你的 Speed Layer 在高峰期依然能穩定運行。


上一篇
【知其然,更知其所以然】Day 4: 手寫 Consumer 實作 Lambda Architecture Speed Layer
下一篇
【知其然,更知其所以然】Day 6:批量寫入的威力 - 讓 Kafka Consumer 暢行無阻
系列文
「知其然,更知其所以然:什麼是 Real-time (Streaming) Pipeline?從造輪子到 Flink 與 RisingWave」15
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言